import _ from 'lodash'
import resolveLambdaTarget from '../../../utils/resolve-lambda-target.js'
import ServerlessError from '../../../../../serverless-error.js'

class AwsCompileStreamEvents {
  constructor(serverless) {
    this.serverless = serverless
    this.provider = this.serverless.getProvider('aws')

    this.hooks = {
      initialize: () => {
        if (
          Object.values(this.serverless.service.functions).some(({ events }) =>
            events.some(({ stream: eventObject }) => {
              const consumer = eventObject && eventObject.consumer
              const usesServiceSpecificNamingMode =
                this.serverless.service.provider.kinesis &&
                this.serverless.service.provider.kinesis.consumerNamingMode &&
                _.get(
                  this.serverless,
                  'service.provider.kinesis.consumerNamingMode',
                ) === 'serviceSpecific'
              return consumer && !usesServiceSpecificNamingMode
            }),
          )
        ) {
          this.serverless._logDeprecation(
            'KINESIS_CONSUMER_NAME_CONTAINING_SERVICE',
            'Starting with v4.0.0 Kinesis streams naming scheme will change. Adapt to new schema by setting`provider.kinesis.consumerNamingMode` property to `serviceSpecific`.',
          )
        }
      },
      'package:compileEvents': async () => this.compileStreamEvents(),
    }

    this.serverless.configSchemaHandler.defineFunctionEvent('aws', 'stream', {
      anyOf: [
        { $ref: '#/definitions/awsArnString' },
        {
          type: 'object',
          properties: {
            // arn constraints are listed in oneOf property of this schema
            arn: {},
            type: { enum: ['dynamodb', 'kinesis'] },
            batchSize: { type: 'integer', minimum: 1, maximum: 10000 },
            parallelizationFactor: { type: 'integer', minimum: 1, maximum: 10 },
            startingPosition: {
              enum: ['LATEST', 'TRIM_HORIZON', 'AT_TIMESTAMP'],
            },
            startingPositionTimestamp: { type: 'number' },
            enabled: { type: 'boolean' },
            consumer: {
              anyOf: [{ const: true }, { $ref: '#/definitions/awsArn' }],
            },
            batchWindow: { type: 'integer', minimum: 0, maximum: 300 },
            maximumRetryAttempts: {
              type: 'integer',
              minimum: -1,
              maximum: 10000,
            },
            bisectBatchOnFunctionError: { type: 'boolean' },
            maximumRecordAgeInSeconds: {
              anyOf: [
                { const: -1 },
                { type: 'integer', minimum: 60, maximum: 604800 },
              ],
            },
            functionResponseType: { enum: ['ReportBatchItemFailures'] },
            destinations: {
              type: 'object',
              properties: {
                onFailure: {
                  anyOf: [
                    { $ref: '#/definitions/awsArnString' },
                    {
                      type: 'object',
                      properties: {
                        // arn constraints are listed in oneOf property of this schema
                        arn: {},
                        type: { enum: ['sns', 'sqs'] },
                      },
                      additionalProperties: false,
                      anyOf: [
                        {
                          properties: {
                            arn: { $ref: '#/definitions/awsCfFunction' },
                          },
                          required: ['arn', 'type'],
                        },
                        {
                          properties: {
                            arn: { $ref: '#/definitions/awsArnString' },
                          },
                          required: ['arn'],
                        },
                      ],
                    },
                  ],
                },
              },
              additionalProperties: false,
              required: ['onFailure'],
            },
            tumblingWindowInSeconds: {
              type: 'integer',
              minimum: 0,
              maximum: 900,
            },
            filterPatterns: { $ref: '#/definitions/filterPatterns' },
          },
          additionalProperties: false,
          anyOf: [
            {
              properties: {
                arn: { $ref: '#/definitions/awsCfFunction' },
              },
              required: ['arn', 'type'],
            },
            {
              properties: {
                arn: { $ref: '#/definitions/awsArnString' },
              },
              required: ['arn'],
            },
          ],
        },
      ],
    })
  }

  compileStreamEvents() {
    this.serverless.service.getAllFunctions().forEach((functionName) => {
      const functionObj = this.serverless.service.getFunction(functionName)

      if (functionObj.events) {
        const dynamodbStreamStatement = {
          Effect: 'Allow',
          Action: [
            'dynamodb:GetRecords',
            'dynamodb:GetShardIterator',
            'dynamodb:DescribeStream',
            'dynamodb:ListStreams',
          ],
          Resource: [],
        }
        const kinesisStreamStatement = {
          Effect: 'Allow',
          Action: [
            'kinesis:GetRecords',
            'kinesis:GetShardIterator',
            'kinesis:DescribeStream',
            'kinesis:ListStreams',
          ],
          Resource: [],
        }
        const kinesisStreamWithConsumerStatement = {
          Effect: 'Allow',
          Action: [
            'kinesis:GetRecords',
            'kinesis:GetShardIterator',
            'kinesis:DescribeStreamSummary',
            'kinesis:ListShards',
          ],
          Resource: [],
        }
        const kinesisConsumerStatement = {
          Effect: 'Allow',
          Action: ['kinesis:SubscribeToShard'],
          Resource: [],
        }
        const onFailureSnsStatement = {
          Effect: 'Allow',
          Action: ['sns:Publish'],
          Resource: [],
        }
        const onFailureSqsStatement = {
          Effect: 'Allow',
          Action: ['sqs:ListQueues', 'sqs:SendMessage'],
          Resource: [],
        }

        functionObj.events.forEach((event) => {
          if (event.stream) {
            let EventSourceArn
            let BatchSize = 10
            let ParallelizationFactor
            let StartingPosition = 'TRIM_HORIZON'
            let Enabled = true

            if (typeof event.stream === 'object') {
              EventSourceArn = event.stream.arn
              BatchSize = event.stream.batchSize || BatchSize
              if (event.stream.parallelizationFactor) {
                ParallelizationFactor = event.stream.parallelizationFactor
              }
              StartingPosition =
                event.stream.startingPosition || StartingPosition

              if (typeof event.stream.enabled !== 'undefined') {
                Enabled = event.stream.enabled
              }
            } else {
              EventSourceArn = event.stream
            }

            const streamType = event.stream.type || EventSourceArn.split(':')[2]
            const streamName = (function () {
              if (EventSourceArn['Fn::GetAtt']) {
                return EventSourceArn['Fn::GetAtt'][0]
              } else if (EventSourceArn['Fn::ImportValue']) {
                return EventSourceArn['Fn::ImportValue']
              } else if (EventSourceArn.Ref) {
                return EventSourceArn.Ref
              } else if (EventSourceArn['Fn::Join']) {
                // [0] is the used delimiter, [1] is the array with values
                const name = EventSourceArn['Fn::Join'][1].slice(-1).pop()
                if (name.split('/').length) {
                  return name.split('/').pop()
                }
                return name
              }
              return EventSourceArn.split('/')[1]
            })()

            const streamLogicalId = this.provider.naming.getStreamLogicalId(
              functionName,
              streamType,
              streamName,
            )

            const dependsOn = []
            const functionIamRoleResourceName =
              this.provider.resolveFunctionIamRoleResourceName(functionObj)
            if (functionIamRoleResourceName) {
              dependsOn.push(functionIamRoleResourceName)
            }
            const { targetAlias } =
              this.serverless.service.functions[functionName]
            if (targetAlias) {
              dependsOn.push(targetAlias.logicalId)
            }

            const streamResource = {
              Type: 'AWS::Lambda::EventSourceMapping',
              DependsOn: dependsOn,
              Properties: {
                BatchSize,
                Enabled,
                EventSourceArn,
                FunctionName: resolveLambdaTarget(functionName, functionObj),
                ParallelizationFactor,
                StartingPosition,
              },
            }

            // add event source ARNs to PolicyDocument statements
            if (streamType === 'dynamodb') {
              dynamodbStreamStatement.Resource.push(EventSourceArn)
            } else if (event.stream.consumer) {
              kinesisStreamWithConsumerStatement.Resource.push(EventSourceArn)
            } else {
              kinesisStreamStatement.Resource.push(EventSourceArn)
            }

            if (event.stream.batchWindow != null) {
              streamResource.Properties.MaximumBatchingWindowInSeconds =
                event.stream.batchWindow
            }

            if (event.stream.maximumRetryAttempts != null) {
              streamResource.Properties.MaximumRetryAttempts =
                event.stream.maximumRetryAttempts
            }

            if (event.stream.bisectBatchOnFunctionError != null) {
              streamResource.Properties.BisectBatchOnFunctionError =
                event.stream.bisectBatchOnFunctionError
            }

            if (event.stream.functionResponseType != null) {
              streamResource.Properties.FunctionResponseTypes = [
                event.stream.functionResponseType,
              ]
            }

            if (event.stream.maximumRecordAgeInSeconds) {
              streamResource.Properties.MaximumRecordAgeInSeconds =
                event.stream.maximumRecordAgeInSeconds
            }

            if (event.stream.tumblingWindowInSeconds != null) {
              streamResource.Properties.TumblingWindowInSeconds =
                event.stream.tumblingWindowInSeconds
            }

            if (event.stream.destinations) {
              let OnFailureDestinationArn

              if (typeof event.stream.destinations.onFailure === 'object') {
                OnFailureDestinationArn =
                  event.stream.destinations.onFailure.arn
              } else {
                OnFailureDestinationArn = event.stream.destinations.onFailure
              }

              const destinationType =
                event.stream.destinations.onFailure.type ||
                OnFailureDestinationArn.split(':')[2]
              // add on failure destination ARNs to PolicyDocument statements
              if (destinationType === 'sns') {
                onFailureSnsStatement.Resource.push(OnFailureDestinationArn)
              } else {
                onFailureSqsStatement.Resource.push(OnFailureDestinationArn)
              }

              streamResource.Properties.DestinationConfig = {
                OnFailure: {
                  Destination: OnFailureDestinationArn,
                },
              }
            }

            if (event.stream.filterPatterns) {
              streamResource.Properties.FilterCriteria = {
                Filters: event.stream.filterPatterns.map((pattern) => ({
                  Pattern: JSON.stringify(pattern),
                })),
              }
            }

            const newStreamObject = {
              [streamLogicalId]: streamResource,
            }

            if (event.stream.consumer && streamType === 'kinesis') {
              if (event.stream.consumer === true) {
                const consumerName = this.provider.naming.getStreamConsumerName(
                  functionName,
                  streamName,
                )
                const consumerResource = {
                  Type: 'AWS::Kinesis::StreamConsumer',
                  Properties: {
                    StreamARN: EventSourceArn,
                    ConsumerName: consumerName,
                  },
                }
                const consumerLogicalId =
                  this.provider.naming.getStreamConsumerLogicalId(consumerName)
                newStreamObject[consumerLogicalId] = consumerResource
                if (Array.isArray(streamResource.DependsOn)) {
                  streamResource.DependsOn.push(consumerLogicalId)
                } else {
                  streamResource.DependsOn = [
                    streamResource.DependsOn,
                    consumerLogicalId,
                  ]
                }
                const consumerArnRef = {
                  Ref: consumerLogicalId,
                }
                streamResource.Properties.EventSourceArn = consumerArnRef
                kinesisConsumerStatement.Resource.push(consumerArnRef)
              } else {
                const consumerArn = event.stream.consumer
                streamResource.Properties.EventSourceArn = consumerArn
                kinesisConsumerStatement.Resource.push(consumerArn)
              }

              if (
                event.stream.startingPosition === 'AT_TIMESTAMP' &&
                !event.stream.startingPositionTimestamp
              ) {
                throw new ServerlessError(
                  `You must specify startingPositionTimestamp for function: ${functionName} when startingPosition is AT_TIMESTAMP`,
                  'FUNCTION_STREAM_STARTING_POSITION_TIMESTAMP_INVALID',
                )
              }

              if (event.stream.startingPositionTimestamp) {
                streamResource.Properties.StartingPositionTimestamp =
                  event.stream.startingPositionTimestamp
              }
            }

            _.merge(
              this.serverless.service.provider.compiledCloudFormationTemplate
                .Resources,
              newStreamObject,
            )
          }
        })

        // update the PolicyDocument statements (if default policy is used)
        if (
          this.serverless.service.provider.compiledCloudFormationTemplate
            .Resources.IamRoleLambdaExecution
        ) {
          const statement =
            this.serverless.service.provider.compiledCloudFormationTemplate
              .Resources.IamRoleLambdaExecution.Properties.Policies[0]
              .PolicyDocument.Statement
          if (dynamodbStreamStatement.Resource.length) {
            statement.push(dynamodbStreamStatement)
          }
          if (kinesisStreamStatement.Resource.length) {
            statement.push(kinesisStreamStatement)
          }
          if (kinesisStreamWithConsumerStatement.Resource.length) {
            statement.push(kinesisStreamWithConsumerStatement)
          }
          if (kinesisConsumerStatement.Resource.length) {
            statement.push(kinesisConsumerStatement)
          }
          if (onFailureSnsStatement.Resource.length) {
            statement.push(onFailureSnsStatement)
          }
          if (onFailureSqsStatement.Resource.length) {
            statement.push(onFailureSqsStatement)
          }
        }
      }
    })
  }
}

export default AwsCompileStreamEvents
